本文通过示例为您介绍如何使用OSS Sensor、WebHDFS Sensor、Spark Operator、Hive Operator、Bash Operator和给DAG配置告警。

背景信息

前提条件

  • 已创建EMR Studio集群。

    创建集群详情,请参见创建集群

  • 已绑定计算集群。
    重要
    • 绑定集群页签下,仅显示同一个VPC下的EMR集群。
    • 仅支持绑定Hadoop集群。

使用OSS Senser

  • 使用场景:任务B需要从OSS中读取文件data.txt,但文件data.txt是通过其他程序上传的,并且不确定具体上传时间。执行任务B前需要判断OSS中是否已经存在文件data.txt,此时您可以使用OSS Sensor,建立任务B依赖于Sensor A的依赖关系,如果Sensor A检测到OSS中存在data.txt文件,则执行任务B。
  • 示例:本示例中Sensor1先检测OSS中的目录airflow/dags/Aliyun_Example_Dags/下是否存在zeppelin_etl_note.py文件,如果存在,则会继续执行下一个任务Sensor2,然后检测OSS中的airflow/dags/Aliyun_Example_Dags/目录下是否有文件名与zeppelin_**_not?.py相匹配的文件,如果存在,则会继续执行最后一个任务。如果Sensor检测到相应的文件在OSS中不存在,则会按照设定的参数,每间隔一段时间检测一次,判断目标文件有没有加入到OSS中。
    from datetime import timedelta
    
    # The DAG object; we'll need this to instantiate a DAG
    from airflow import DAG
    # Operators; we need this to operate!
    from airflow.operators.bash_operator import BashOperator
    from airflow.utils.dates import days_ago
    from airflow.contrib.sensors.aliyun_oss_sensor import AliyunOssKeySensor
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': days_ago(2),
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'dag': dag,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    }
    dag = DAG(
        'example_oss_sensor',
        default_args=default_args,
        description='a dag used to test oss sensor',
        schedule_interval=timedelta(days=1),
    )
    
    sensor1 = AliyunOssKeySensor(
        task_id='detect_target_key_1',
        # bucket_name是目标文件所在OSS的名称。
        bucket_name='chufeng-oss-ddc-hz',
        # bucket_key是目标文件在OSS中的完整路径。
        bucket_key='airflow/dags/Aliyun_Example_Dags/zeppelin_etl_note.py',
        dag=dag,
    )
    
    # wildcard_match设置为True使用通配符匹配(wildcard match),可以在OSS Bucket中对bucket_key进行模糊搜索。
    # 星号(*)匹配单个或多个字符,问号(?)匹配单个字符。
    sensor2 = AliyunOssKeySensor(
        task_id='detect_target_key_2',
        bucket_name='chufeng-oss-ddc-hz',
        # 星号(*)匹配单个或多个字符,问号(?)匹配单个字符。
        bucket_key='airflow/dags/Aliyun_Example_Dags/zeppelin_**_not?.py',
        # 默认值为False,如果进行通配符匹配(wildcard match),则需要设为True。
        wildcard_match=True,
        dag=dag,
    )
    
    task = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag,
    )
    
    sensor1 >> sensor2 >> task

使用WebHDFS Sensor

  • 使用场景:任务B需要从HDFS中读取文件a.txt,但文件a.txt是通过其他程序上传的,并且不确定具体上传时间。执行任务B前需要判断HDFS中是否已经存在文件a.txt,此时您可以使用WebHDFS Sensor,建立任务B依赖于Sensor A的依赖关系。如果Sensor A检测到HDFS中存在a.txt文件,则执行任务B。
  • 示例:本示例中Sensor1先检测HDFS中是否存在a.txt文件,如果存在,则会继续执行下一个任务;如果不存在,则会按照设定的参数,每间隔一段时间检测一次,判断目标文件有没有加入到HDFS中。
    from datetime import timedelta
    
    # The DAG object; we'll need this to instantiate a DAG
    from airflow import DAG
    # Operators; we need this to operate!
    from airflow.operators.bash_operator import BashOperator
    from airflow.sensors.hdfs_sensor import HdfsSensor
    from airflow.sensors.web_hdfs_sensor import WebHdfsSensor
    from airflow.utils.dates import days_ago
    
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    
    # 需要关联Hadoop集群,否则可能会执行失败。
    # 优先使用Sensor中指定的cluster_id,如果Sensor中未指定,则使用在DAGdefault_args中指定的cluster_id。
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': days_ago(2),
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        'cluster_id': "C-8A9CAA9E4440****",
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'dag': dag,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'sla_miss_callback': yet_another_function,
        # 'trigger_rule': 'all_success'
    }
    dag = DAG(
        'example_web_hdfs_sensor',
        default_args=default_args,
        description='a dag used to test webhdfs sensor',
        schedule_interval=timedelta(days=1),
    )
    
    sensor1 = WebHdfsSensor(
        # filepath
        filepath="/a.txt",
        # cluster_id="C-8A9CAA9E4440****",
        task_id='detect_target_key_1',
        dag=dag,
    )
    
    
    task = BashOperator(
        task_id='print_date',
        bash_command='date',
        dag=dag,
    )
    
    sensor1 >> task
                        
    说明 Airflow所有Sensor中都有两个默认参数poke_intervaltimeout,均以秒为单位。poke_interval默认值是60(1分钟),timeout默认值是60*60*24*7(1周)。如果需要改变poke_interval,则建议设定值不要低于60,以免频繁请求造成效率下降。

使用Spark Operator

重要
  • Spark Operator必须指定cluster_id。

    优先使用Operator中指定的cluster_id,如果未指定,则使用DAGdefault_args参数指定的cluster_id。

  • Spark Operator所需文件建议上传至挂载的OSS路径,并在代码中通过变量oss_mount_folder使用。

    Operator可能会执行在EMR Studio集群的任意worker节点上,因此需要提交的JAR或文件等,建议上传至创建集群时所设置的数据开发存储的OSS路径下,可以避免同步数据的操作。该路径的Bucket会挂载至EMR Studio集群中所有主机的/mnt/jfs/下。上传完成后,可以在Operator中通过oss_mount_folder使用,例如,实例代码。oss_mount_folderEMR Studio Airflow中默认已提供的变量,值为数据开发存储的OSS路径所映射的主机的本地路径。

  • 使用场景:Spark相关的两个Operator,SparkSubmitOperator用于提交JARPython文件,SparkSqlOperator用于提交spark-sql。
  • 示例:首先需要通过spark-submit运行SparkPi样例,然后通过Spark-sql创建表并插入数据,最后使用自定义函数UDF。
    from datetime import timedelta
    
    from airflow.contrib.operators.spark_sql_operator import SparkSqlOperator
    from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
    from airflow.models import DAG
    from airflow.models import Variable
    from airflow.utils.dates import days_ago
    
    args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': days_ago(1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0,
        'cluster_id': "C-8A9CAA9E4440****",
    }
    
    dag = DAG(
        dag_id="example_spark_operator",
        default_args=args,
        schedule_interval='0 0 * * *',
        dagrun_timeout=timedelta(minutes=60),
        tags=['example'],
    )
    
    oss_mount_folder = Variable.get('oss_mount_folder',"")
    
    run_first = SparkSubmitOperator(
        task_id='run_first',
        dag=dag,
        application=oss_mount_folder + '/example/spark-examples_2.11-2.4.7.jar',
        java_class='org.apache.spark.examples.SparkPi',
        name='airflow-spark-submit',
        conf={'spark.memory.storageFraction':0.6},
        executor_cores= 1,
        executor_memory='1g',
        driver_memory='1g',
        # cluster_id="C-6394A97DE6D7****",
        # conn_id='spark_default',
        # files=None,
        # py_files=None,
        # archives=None,
        # driver_class_path=None,
        # jars=None,
        # packages=None,
        # exclude_packages=None,
        # repositories=None,
        # total_executor_cores=None,
        # keytab=None,
        # principal=None,
        # proxy_user=None,
        # num_executors=None,
        # status_poll_interval=1,
        # application_args=None,
        # env_vars=None,
        # verbose=False,
        # spark_binary=None,
        # cluster_id=None,
    )
    
    run_second = SparkSqlOperator(
        task_id='run_second',
        dag=dag,
        sql='''
        create database if not exists airflow;
        use airflow;
        drop table if exists test_sparksql;
        create table test_sparksql(name string);
        insert into test_sparksql values('studio');
        ''',
        name='airflow-spark-sql-1',
        conf='spark.sql.shuffle.partitions=200'
        # conn_id='spark_default',
        # total_executor_cores=None,
        # executor_cores=None,
        # executor_memory=None,
        # keytab=None,
        # principal=None,
        # master='yarn',
        # num_executors=None,
        # verbose=True,
        # yarn_queue='default',
        # cluster_id=None,
    )
    
    
    run_third = SparkSqlOperator(
        task_id='run_third',
        dag=dag,
        sql='''
        use airflow;
        add jar oss://emr-studio-example/hive-udf-1.0-SNAPSHOT.jar;
        create temporary function simpleudf AS 'com.aliyun.emr.hive.udf.SimpleUDFExample';
        show functions like '*udf';
        select simpleudf(name) from test_sparksql;
        ''',
        name='airflow-spark-sql-2',
        conf='spark.sql.shuffle.partitions=200'
    )
    
    run_first >> run_second >> run_third

使用Hive Operator

重要
  • Hive Operator必须指定cluster_id。

    优先使用Operator中指定的cluster_id,如果未指定,则使用DAGdefault_args参数指定的cluster_id。

  • Operator所需文件建议上传至挂载的OSS路径。
  • 使用场景:Hive Operator可用于提交Hive SQL。
  • 示例:先创建表并插入数据,然后使用自定义函数UDF。
    example_hive_operator.py文件详情如下。
    from datetime import timedelta
    from airflow.models import DAG
    from airflow.operators.hive_operator import HiveOperator
    from airflow.utils.dates import days_ago
    
    args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': days_ago(1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0,
        'cluster_id': "C-8A9CAA9E4440****",
    }
    
    dag = DAG(
        dag_id="example_hive_operator",
        default_args=args,
        schedule_interval='0 0 * * *',
        dagrun_timeout=timedelta(minutes=60),
        tags=['example'],
    )
    
    run_first = HiveOperator(
        task_id='run_first',
        dag=dag,
        hql='''
        create database if not exists airflow;
        use airflow;
        drop table if exists test_hive;
        create table test_hive(name string);
        insert into test_hive values('studio');
        ''',
        hiveconfs={'hive.execution.engine': 'tez'},
        mapred_job_name='airflow-hive-sql-1',
        # hive_cli_conn_id="hiveserver2_default",
        # cluster_id="C-8A9CAA9E4440****", 可以另指定集群,不指定时默认使用DAG的集群。
        # mapred_queue=None,
        # mapred_queue_priority=None,
        # hiveconf_jinja_translate=False,
        # script_begin_tag=None,
        # run_as_owner=False,
    )
    
    run_second = HiveOperator(
        task_id='run_second',
        dag=dag,
        hql='''
        use airflow;
        add jar oss://emr-studio-example/hive-udf-1.0-SNAPSHOT.jar;
        create temporary function simpleudf AS 'com.aliyun.emr.hive.udf.SimpleUDFExample';
        show functions like '*udf';
        select simpleudf(name) from test_hive;
        ''',
        hiveconfs={'hive.execution.engine': 'tez'},
        mapred_job_name='airflow-hive-sql-2',
    )
    
    
    run_first >> run_second                  

使用Bash Operator

重要
  • Bash Operator如果调用HadoopHDFS等命令,则必须指定cluster_id。

    优先使用Operator中指定的cluster_id,如果未指定,则使用DAGdefault_args参数指定的cluster_id。

  • Operator所需文件建议上传至挂载的OSS路径,并在代码中通过变量oss_mount_folder使用。

    Operator可能会执行在EMR Studio集群的任意worker节点上,因此需要提交的JAR或文件等,建议上传至创建集群时所设置的数据开发存储的OSS路径下,可以避免同步数据的操作。该路径的Bucket会挂载至EMR Studio集群中所有主机的/mnt/jfs/下。上传完成后,可以在Operator中通过oss_mount_folder使用,例如,实例代码。oss_mount_folderEMR Studio Airflow中默认已提供的变量,值为数据开发存储的OSS路径所映射的主机的本地路径。

  • 使用场景:Bash Operator可用于提交bash命令。
  • 示例:首先构造数据集上传至HDFS,然后提交MapReduce作业,最后查看HDFS上的结果。
    example_bash_operator.py代码详情如下。
    from builtins import range
    from datetime import timedelta
    
    from airflow.models import DAG
    from airflow.models import Variable
    from airflow.operators.bash_operator import BashOperator
    from airflow.utils.dates import days_ago
    
    args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': days_ago(1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0,
        'cluster_id': "C-8A9CAA9E4440****",
    }
    
    dag = DAG(
        dag_id='example_bash_operator',
        default_args=args,
        schedule_interval='0 0 * * *',
        dagrun_timeout=timedelta(minutes=60),
        tags=['example'],
    )
    
    oss_mount_folder = Variable.get('oss_mount_folder')
    
    run_first = BashOperator(
        task_id='run_first',
        bash_command=r'''
        hdfs dfs -mkdir -p /tmp/wordcount/input &&
        hdfs dfs -rm -r -f /tmp/wordcount/output &&
        echo -e "1201\tGopal\t45000\tTechnical manager
    1202\tManisha\t45000\tProof reader
    1203\tMasthanvali\t40000\tTechnical writer
    1204\tKiran\t40000\tHr Admin
    1205\tKranthi\t30000\tOp Admin" > /tmp/sample.txt &&
        hdfs dfs -put -f /tmp/sample.txt /tmp/wordcount/input
        ''',
        dag=dag,
    )
    
    run_second = BashOperator(
        bash_command='hadoop jar {0}/example/hadoop-mapreduce-examples-3.2.1.jar wordcount /tmp/wordcount/input /tmp/wordcount/output'.format(oss_mount_folder),
        task_id='run_second',
        dag=dag,
    )
    
    run_third = BashOperator(
        bash_command='hdfs dfs -cat /tmp/wordcount/output/part-r-00000',
        task_id='run_third',
        dag=dag,
    )
    
    
    run_first >> run_second >> run_third
                        

DAG配置告警

告警示例如下:
  • DAG失败告警示例(dag_fail_apm)
    根据DAG失败告警级别的不同从airflow.contrib.operators.aliyun_apm_operator导入不同的callback告警函数,并传给DAG中的on_failure_callback函数。对于严重的失败事件,采用函数apm_alert_callback_dagrun_fail_critical。对于不严重的、警告类型的失败事件,采用函数apm_alert_callback_dagrun_fail_warning
    from airflow import DAG
    from datetime import datetime, timedelta
    from airflow.utils.dates import days_ago
    from airflow.operators.bash_operator import BashOperator
    # 导入告警callback函数。
    from airflow.contrib.operators.aliyun_apm_operator import apm_alert_callback_dagrun_fail_critical
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': days_ago(2),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 0,
        'retry_delay': timedelta(minutes=5),
        # 将告警callback函数传入DAG。
        'on_failure_callback': apm_alert_callback_dagrun_fail_critical,
    }
    
    
    with DAG('dag_fail_apm',
             max_active_runs=5,
             schedule_interval='0 0 * * *',
             default_args=default_args) as dag:
    
        intentionally_failed_task = BashOperator(
            task_id='intentionally_failed_task',
            bash_command='echooooooooo make a mistake intentionally'
        )
    
        intentionally_failed_task
  • DAG SLA告警示例(dag_sla_breach_apm)
    根据DAG SLA触发告警级别的不同从airflow.contrib.operators.aliyun_apm_operator导入不同的callback告警函数,并传给DAG中的sla_miss_callback函数。对于严重的SLA触发事件,采用apm_alert_callback_miss_sla_critical;对于不严重的、警告类型的SLA触发事件,采用apm_alert_callback_miss_sla_warning
    from datetime import timedelta
    # 导入SLA触发callback告警函数。
    from airflow.contrib.operators.aliyun_apm_operator import apm_alert_callback_miss_sla_warning
    
    # The DAG object; we'll need this to instantiate a DAG
    from airflow import DAG
    # Operators; we need this to operate!
    from airflow.operators.bash_operator import BashOperator
    from airflow.utils.dates import days_ago
    # These args will get passed on to each operator
    # You can override them on a per-task basis during operator initialization
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': days_ago(3),
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retry_delay': timedelta(seconds=20)
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
        # 'wait_for_downstream': False,
        # 'dag': dag,
        # 'sla': timedelta(hours=2),
        # 'execution_timeout': timedelta(seconds=300),
        # 'on_failure_callback': some_function,
        # 'on_success_callback': some_other_function,
        # 'on_retry_callback': another_function,
        # 'trigger_rule': 'all_success'
    }
    dag = DAG(
        'dag_sla_breach_apm',
        default_args=default_args,
        description='breach sla intentionally',
        schedule_interval=timedelta(days=1),
        sla_miss_callback=apm_alert_callback_miss_sla_warning
    )
    
    t2 = BashOperator(
        task_id='oversleep',
        depends_on_past=False,
        bash_command='sleep 10',
        # SLA设定1s,当命令sleep 10s时,必定触发SLA。
        sla=timedelta(seconds=1),
        dag=dag
    )
    
    t2